教程

您所在的位置:网站首页 airflow webserver教程 教程

教程

2023-03-12 11:38| 来源: 网络整理| 查看: 265

教程#

贡献者:@ImPerat0R_、@ThinkingChen、@Ray、@zhongjiajie

本教程将向您介绍一些 Airflow 的基本概念、对象以及它们在编写第一个 pipline(管道)时的用法。

定义 Pipeline(管道)的例子#

以下是定义一个基本 pipline(管道)的示例。如果这看起来很复杂,请不要担心,下面将逐行说明。

""" Airflow 教程代码位于: https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py """ from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2015, 6, 1), 'email': ['[email protected]'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1)) # t1、t2 和 t3 是通过实例化 Operators 创建的任务示例 t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag) t2 = BashOperator( task_id='sleep', bash_command='sleep 5', retries=3, dag=dag) templated_command = """ { % for i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7)}}" echo "{{ params.my_param }}" { % end for %} """ t3 = BashOperator( task_id='templated', bash_command=templated_command, params={'my_param': 'Parameter I passed in'}, dag=dag) t2.set_upstream(t1) t3.set_upstream(t1) 这是一个 DAG 定义文件#

有一件事需要考虑(一开始可能不是很直观),这个 Airflow 的 Python 脚本实际上只是一个将 DAG 的结构指定为代码的配置文件。此处定义的实际任务将在与此脚本定义的不同上下文中运行。不同的任务在不同的时间点运行在不同的 worker(工作节点)上,这意味着该脚本不能在任务之间交叉通信。请注意,为此,我们有一个名为XCom的更高级功能。

人们有时会将 DAG 定义文件视为可以进行实际数据处理的地方 - 但事实并非如此!该脚本的目的是定义 DAG 对象。它需要快速评估(秒,而不是几分钟),因为 scheduler(调度器)将定期执行它以反映更改(如果有的话)。

导入模块#

一个 Airflow 的 pipeline 就是一个 Python 脚本,这个脚本的作用是为了定义 Airflow 的 DAG 对象。让我们首先导入我们需要的库。

# DAG 对象; 我们将需要它来实例化一个 DAG from airflow import DAG # Operators; 我们需要利用这个对象去执行流程! from airflow.operators.bash_operator import BashOperator 默认参数#

我们即将创建一个 DAG 和一些任务,我们可以选择显式地将一组参数传递给每个任务的构造函数(这可能变得多余),或者(最好地)我们可以定义一个默认参数的字典,这样我们可以在创建任务时使用它。

from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2015, 6, 1), 'email': ['[email protected]'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), }

有关 BaseOperator 参数及其功能的更多信息,请参阅airflow.models.BaseOperator文档。

另外,请注意,您可以轻松定义可用于不同目的的不同参数集。一个典型的例子是在生产和开发环境之间进行不同的设置。

实例化一个 DAG#

我们需要一个 DAG 对象来嵌入我们的任务。这里我们传递一个定义为dag_id的字符串,把它用作 DAG 的唯一标识符。我们还传递我们刚刚定义的默认参数字典,同时也为 DAG 定义schedule_interval,设置调度间隔为每天一次。

dag = DAG( 'tutorial', default_args=default_args, schedule_interval=timedelta(days=1)) (Task)任务#

在实例化 operator(执行器)时会生成任务。从一个 operator(执行器)实例化出来的对象的过程,被称为一个构造方法。第一个参数task_id充当任务的唯一标识符。

t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag) t2 = BashOperator( task_id='sleep', bash_command='sleep 5', retries=3, dag=dag)

注意到我们传递了一个 BaseOperator 特有的参数(bash_command)和所有的 operator 构造函数中都会有的一个参数(retries)。这比为每个构造函数传递所有的参数要简单很多。另请注意,在第二个任务中,我们使用3覆盖了默认的retries参数值。

任务参数的优先规则如下:

明确传递参数 default_args字典中存在的值 operator 的默认值(如果存在)

任务必须包含或继承参数task_id和owner,否则 Airflow 将出现异常。

使用 Jinja 作为模版#

Airflow 充分利用了Jinja Templating的强大功能,并为 pipline(管道)的作者提供了一组内置参数和 macros(宏)。Airflow 还为 pipline(管道)作者提供了自定义参数,macros(宏)和 templates(模板)的能力。

本教程几乎没有涉及在 Airflow 中使用模板进行操作的工作领域,但本节的目的是让您知道此功能的存在,让您熟悉{{ }}双花括号的用途,并指出最常见的模板变量: {{ ds }} (今天的“日期戳”)。

templated_command = """ { % f or i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7) }}" echo "{{ params.my_param }}" { % e ndfor %} """ t3 = BashOperator( task_id='templated', bash_command=templated_command, params={'my_param': 'Parameter I passed in'}, dag=dag)

请注意,templated_command包含{% %}块中的代码逻辑,引用参数如{{ ds }},调用函数方式如{{ macros.ds_add(ds, 7)}},引用用户定义的参数如{{ params.my_param }}。

在BaseOperator中的paramshook 允许您将参数或对象的字典传递给您的模板。请花一些时间去了解my_param这个参数是如何在模板中被使用的。

文件也可以当做bash_command的参数进行传递,例如bash_command='templated_command.sh',不过这个文件的位置要在 pipeline(管道)文件的目录内(在本例中为tutorial.py)。这可能是出于多种原因,比如将脚本的逻辑和 pipeline 代码分隔开,允许在使用不同语言编写的文件中进行正确的代码突出显示,以及灵活地构建 pipeline(管道)。还可以定义您的template_searchpath,以指向 DAG 构造函数调用中的任何文件夹位置。

使用同样的 DAG 构造函数调用,可以使用user_defined_macros来定义您自己的变量。例如,将dict(foo='bar')传递给此参数允许您在模板中使用{{ foo }} 。此外,允许您指定user_defined_filters来注册自己的过滤器。例如,将dict(hello=lambda name: 'Hello %s' % name)传递给此参数可以允许您在你的模板中使用{{ 'world' | hello }}。有关自定义过滤器的更多信息,请查看Jinja 文档

有关可以在模板中引用的变量和宏的更多信息,请务必阅读宏部分

设置依赖关系#

我们有三个不相互依赖任务,分别是t1,t2,t3。以下是一些可以定义它们之间依赖关系的方法:

t1.set_downstream(t2) # 这意味着 t2 会在 t1 成功执行之后才会执行 # 与下面这种写法相等 t2.set_upstream(t1) # 位移运算符也可用于链式运算 # 用于链式关系 和上面达到一样的效果 t1 >> t2 # 位移运算符用于上游关系中 t2 > t2 >> t3 # 任务列表也可以设置为依赖项。 # 下面的这些操作都具有相同的效果: t1.set_downstream([t2, t3]) t1 >> [t2, t3] [t2, t3]


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3